﻿/**

 * Copyright (c) 2015-2016, FastDev 刘强 (fastdev@163.com) & Quincy.

 *

 * Licensed under the Apache License, Version 2.0 (the "License");

 * you may not use this file except in compliance with the License.

 * You may obtain a copy of the License at

 *

 *      http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */


using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using OF.Notify.DataHost.Cluster.Entity;
using OF.Notify.Entity;

namespace OF.Notify.DataHost.Cluster.Disk
{
    public class DiskCollectionOnlineMessageStore
    {
        internal int clusterId;
        internal int collectionId;
        internal FileStream fs = null;             
        internal int nodeId = 0;
        internal string filePath = null;
        internal HeadTailLink<TopicMessage> onLineMessageList = new HeadTailLink<TopicMessage>();
		internal TopicClusterContext topicContext;
		internal byte topicEnum;

        public static bool EnableDeleteCollectionFile = true;

        public DiskCollectionOnlineMessageStore(byte topicEnum, int nodeId, int clusterId, int collectionId)
        {
			this.topicEnum = topicEnum;
			topicContext = ClusterContext.Get().GetTopicContext(topicEnum);
            this.nodeId = nodeId;
            this.clusterId = clusterId;
            this.collectionId = collectionId;
        }

        public List<TopicMessage> GetOnLineMessageList()
        {
            List<TopicMessage> result = null;
            ClusterContext.StringLock.Invoke(filePath, () => {
                result = onLineMessageList.GetList();
            });
            return result;
        }

        public bool TransferOnlineMessagesToConsumer(ConsumerTopicDataNode consumerTopicDataNode)
        {
            bool isTransfered = false;
            ClusterContext.StringLock.Invoke(filePath, () => {
                isTransfered = consumerTopicDataNode.AddOnlineMessgeListToHandle(this.clusterId, this.collectionId, onLineMessageList);
            });
            return isTransfered;
        }

        public void InitSyncedCollection()
        {
            var context = ClusterContext.Get();
            filePath = topicContext.GetMessagePath(this.nodeId, this.collectionId);
            DiskVisitor.EnsureFileDirectoryExists(filePath);
            //Util.LogInfo("Open File:" + filePath);
            fs = new FileStream(filePath, FileMode.Append);
        }

        public int GetCollectionId()
        {
            return this.collectionId;
        }

        public void SaveOnlineMessage(TopicMessage msg)
        {
            ClusterContext.StringLock.Invoke(filePath, () => {
                onLineMessageList.Add(new LinkItem<TopicMessage> { Data = msg });
                var messgeContent = msg.GetContent();
                fs.Write(System.BitConverter.GetBytes((UInt16)(messgeContent.Length)), 0, 2);
                fs.Write(messgeContent, 0, messgeContent.Length);
                fs.Flush();
            });
        }
        
        public void DoDispose()
        {
            if (fs != null)
            {
                ClusterContext.StringLock.Invoke(filePath, () =>
                {
                    if (fs != null)
                    {
                        fs.Flush(true);
                        fs.Close();
                        fs = null;
                        //Util.LogInfo("Close File:" + filePath);
                    }
                });
            }
            onLineMessageList.IsFull = true;
        }

        public static void Touch(byte topicEnum, int nodeId, int collectionId)
        {
            string messageFilePath = ClusterContext.Get().GetTopicContext(topicEnum).GetMessagePath(nodeId, collectionId);
            DiskVisitor.SafeCreateFile(messageFilePath, null);
        }

        public static void Delete(byte topicEnum, int nodeId, int collectionId)
        {
            if (EnableDeleteCollectionFile)
            {
                string messageFilePath = ClusterContext.Get().GetTopicContext(topicEnum).GetMessagePath(nodeId, collectionId);
                DiskVisitor.SafeDeleteFile(messageFilePath);
            }
        }

        public static bool Contains(byte topicEnum, int nodeId, int collectionId)
        { 
            string messageFilePath = ClusterContext.Get().GetTopicContext(topicEnum).GetMessagePath(nodeId, collectionId);
            return File.Exists(messageFilePath);
        }

        public static List<TopicMessage> GetMessages(byte topicEnum, int nodeId, int collectionId)
        {
            var context = ClusterContext.Get();
            string messageFilePath = ClusterContext.Get().GetTopicContext(topicEnum).GetMessagePath(nodeId, collectionId);
            if (File.Exists(messageFilePath))
            {
                byte[] btArray = null;
                ClusterContext.StringLock.Invoke(messageFilePath, () => {
                    using (FileStream fs = new FileStream(messageFilePath, FileMode.Open, FileAccess.Read))
                    {
                        btArray = new byte[fs.Length];
                        fs.Read(btArray, 0, btArray.Length);
                    }
                });
                List<TopicMessage> result = new List<TopicMessage>();
                int visitI = 0;
                while (visitI < btArray.Length)
                { 
                    byte[] tempArr = new byte[2];
                    Array.Copy(btArray, visitI, tempArr, 0, 2);
                    UInt16 contentLegnth = BitConverter.ToUInt16(tempArr, 0);
                    visitI += 2;

                    byte[] content = new byte[contentLegnth];
                    Array.Copy(btArray, visitI, content, 0, contentLegnth);
                    visitI += contentLegnth;
                    result.Add(TopicMessage.Parse(topicEnum, content));
                }
                return result;
            }
            else
            {
                return null;
            }
        }
    }
}
 